package com.gzyj.flink.gps.map;

import com.gzyj.flink.gps.GPSPosition;
import com.gzyj.flink.veh.AnbiaoVehicle;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class IsInDatabase extends RichMapFunction<GPSPosition, GPSPosition> {
    private Connection connection;
    private PreparedStatement ps;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            ExecutionConfig.GlobalJobParameters globalJobParam = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            Configuration globconf = (Configuration) globalJobParam;
            this.connection = DriverManager.getConnection(globconf.getString("mysql-url", null),
                    globconf.getString("mysql-user", "root"),
                    globconf.getString("mysql-password", "123456"));

            String sql = "select * from anbiao_vehicle where id=? and  (is_deleted=0 or is_deleted is null)";

            this.ps = this.connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            this.ps.setFetchSize(50 * 1000);
            this.ps.setMaxRows(50 * 1000);
            this.ps.setFetchDirection(ResultSet.FETCH_REVERSE);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
    }

    @Override
    public GPSPosition map(GPSPosition gpsPosition) throws Exception {
        if (StringUtils.isNotEmpty(gpsPosition.getTerminalPhoneNo())) {
            ResultSet rs = this.ps.executeQuery("select * from anbiao_vehicle where zongduanid=" + gpsPosition.getTerminalPhoneNo() + " and  (is_deleted=0 or is_deleted is null)");

            AnbiaoVehicle vehicle = this.processRS(rs);

            if (vehicle != null) {


                gpsPosition.setVehId(vehicle.getId());
                gpsPosition.setCheliangpaizhao(vehicle.getCheliangpaizhao());
                gpsPosition.setChepaiyanse(vehicle.getChepaiyanse());
                gpsPosition.setDeptId(vehicle.getDeptId());
                gpsPosition.setShiyongxingzhi(vehicle.getShiyongxingzhi());
                gpsPosition.setCheliangzhuangtai(vehicle.getCheliangzhuangtai());
            }

            return gpsPosition;

        } else if (StringUtils.isNotEmpty(gpsPosition.getCheliangpaizhao()) || StringUtils.isNotEmpty(gpsPosition.getChepaiyanse())) {
            ResultSet rs = this.ps.executeQuery("select * from anbiao_vehicle where (cheliangpaizhao=" + gpsPosition.getCheliangpaizhao() + " and chepaiyanse=" + gpsPosition.getChepaiyanse() + ") and  (is_deleted=0 or is_deleted is null)");
            AnbiaoVehicle vehicle = this.processRS(rs);
            if (vehicle != null) {
                gpsPosition.setVehId(vehicle.getId());
                gpsPosition.setCheliangpaizhao(vehicle.getCheliangpaizhao());
                gpsPosition.setChepaiyanse(vehicle.getChepaiyanse());
                gpsPosition.setDeptId(vehicle.getDeptId());
                gpsPosition.setShiyongxingzhi(vehicle.getShiyongxingzhi());
                gpsPosition.setCheliangzhuangtai(vehicle.getCheliangzhuangtai());
            }

            return gpsPosition;

        }


        return null;
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    private AnbiaoVehicle processRS(ResultSet rs) throws Exception {

        while (rs.next()) {

            AnbiaoVehicle anbiaoVehicle = new AnbiaoVehicle();

            anbiaoVehicle.setId(rs.getString("id"));
            anbiaoVehicle.setDeptId(rs.getLong("dept_id"));
            anbiaoVehicle.setCheliangpaizhao(rs.getString("cheliangpaizhao"));
            anbiaoVehicle.setChepaiyanse(rs.getString("chepaiyanse"));
            anbiaoVehicle.setShiyongxingzhi(rs.getString("shiyongxingzhi"));
            anbiaoVehicle.setJiashiyuanid(rs.getString("jiashiyuanid"));
            anbiaoVehicle.setChangpai(rs.getString("changpai"));
            anbiaoVehicle.setXinghao(rs.getString("xinghao"));
            anbiaoVehicle.setChejiahao(rs.getString("chejiahao"));
            anbiaoVehicle.setLuntaiguige(rs.getString("luntaiguige"));
            anbiaoVehicle.setCheshenyanse(rs.getString("cheshenyanse"));
            anbiaoVehicle.setHedingzaike(rs.getString("hedingzaike"));
            anbiaoVehicle.setYingyunnianxian(rs.getString("yingyunnianxian"));
            anbiaoVehicle.setDengjizhengshubianhao(rs.getString("dengjizhengshubianhao"));
            anbiaoVehicle.setChelianglaiyuan(rs.getString("chelianglaiyuan"));
            anbiaoVehicle.setZhucedengjishijian(rs.getString("zhucedengjishijian"));
            anbiaoVehicle.setRuhushijian(rs.getString("ruhushijian"));
            anbiaoVehicle.setGuohushijian(rs.getString("guohushijian"));
            anbiaoVehicle.setTuishishijian(rs.getString("tuishishijian"));
            anbiaoVehicle.setQiangzhibaofeishijian(rs.getString("qiangzhibaofeishijian"));
            anbiaoVehicle.setJieboyunshuzhenghao(rs.getString("jieboyunshuzhenghao"));
            anbiaoVehicle.setYuancheliangpaizhao(rs.getString("yuancheliangpaizhao"));
            anbiaoVehicle.setCheliangzhuangtai(rs.getInt("cheliangzhuangtai"));
            anbiaoVehicle.setCheliangtingfangdiqu(rs.getString("cheliangtingfangdiqu"));
            anbiaoVehicle.setDanganhao(rs.getString("danganhao"));
            anbiaoVehicle.setBeiyongcheliang(rs.getString("beiyongcheliang"));
            anbiaoVehicle.setYunyingshang(rs.getString("yunyingshang"));
            anbiaoVehicle.setSuoshuchedui(rs.getString("suoshuchedui"));
            anbiaoVehicle.setXingshifujian(rs.getString("xingshifujian"));
            anbiaoVehicle.setFujian(rs.getString("fujian"));
            anbiaoVehicle.setFadongjixinghao(rs.getString("fadongjixinghao"));
            anbiaoVehicle.setFadongjihao(rs.getString("fadongjihao"));
            anbiaoVehicle.setFadongjipailianggonglv(rs.getString("fadongjipailianggonglv"));
            anbiaoVehicle.setRanliaoleibie(rs.getString("ranliaoleibie"));
            anbiaoVehicle.setRanyouxiaohao(rs.getString("ranyouxiaohao"));
            anbiaoVehicle.setPaifangbiaozhun(rs.getString("paifangbiaozhun"));
            anbiaoVehicle.setZhuanxiangfangshi(rs.getString("zhuanxiangfangshi"));
            anbiaoVehicle.setChemenshezhi(rs.getString("chemenshezhi"));
            anbiaoVehicle.setZhouju(rs.getString("zhouju"));
            anbiaoVehicle.setChechang(rs.getString("chechang"));
            anbiaoVehicle.setChekuan(rs.getString("chekuan"));
            anbiaoVehicle.setChegao(rs.getString("chegao"));
            anbiaoVehicle.setLuntaishu(rs.getString("luntaishu"));
            anbiaoVehicle.setChezhoushu(rs.getString("chezhoushu"));
            anbiaoVehicle.setGangbantanhuangpianshu(rs.getString("gangbantanhuangpianshu"));
            anbiaoVehicle.setDipanxinghao(rs.getString("dipanxinghao"));
            anbiaoVehicle.setDonglileixing(rs.getString("donglileixing"));
            anbiaoVehicle.setZongzhiliang(rs.getString("zongzhiliang"));
            anbiaoVehicle.setZhengbeizhiliang(rs.getString("zhengbeizhiliang"));
            anbiaoVehicle.setLuntaizonglei(rs.getString("luntaizonglei"));
            anbiaoVehicle.setXuanguaxingshi(rs.getString("xuanguaxingshi"));
            anbiaoVehicle.setXingchezhidongfangshi(rs.getString("xingchezhidongfangshi"));
            anbiaoVehicle.setZhidongqiqianlun(rs.getString("zhidongqiqianlun"));
            anbiaoVehicle.setZhidongqihoulun(rs.getString("zhidongqihoulun"));
            anbiaoVehicle.setAbs(rs.getString("abs"));
            anbiaoVehicle.setKongtiaoxitong(rs.getString("kongtiaoxitong"));
            anbiaoVehicle.setHuanshuqi(rs.getString("huanshuqi"));
            anbiaoVehicle.setBiansuxiangxingshi(rs.getString("biansuxiangxingshi"));
            anbiaoVehicle.setZhizhaochangshang(rs.getString("zhizhaochangshang"));
            anbiaoVehicle.setGouzhishuizhenghao(rs.getString("gouzhishuizhenghao"));
            anbiaoVehicle.setChuchangriqi(rs.getString("chuchangriqi"));
            anbiaoVehicle.setLeijilicheng(rs.getString("leijilicheng"));
            anbiaoVehicle.setZhongduanfuwuqi(rs.getString("zhongduanfuwuqi"));
            anbiaoVehicle.setCheliangdengji(rs.getString("cheliangdengji"));
            anbiaoVehicle.setWeishengjian(rs.getString("weishengjian"));
            anbiaoVehicle.setFadongjipailiang(rs.getString("fadongjipailiang"));
            anbiaoVehicle.setCheliangwaikuochicun(rs.getString("cheliangwaikuochicun"));
            anbiaoVehicle.setRanliaoxiaohaofujian(rs.getString("ranliaoxiaohaofujian"));
            anbiaoVehicle.setBeizhu(rs.getString("beizhu"));
            anbiaoVehicle.setGpsanzhuangshijian(rs.getString("gpsanzhuangshijian"));
            anbiaoVehicle.setZhinenghuaxitong(rs.getString("zhinenghuaxitong"));
            anbiaoVehicle.setGps(rs.getString("gps"));
            anbiaoVehicle.setXingshijiluyi(rs.getString("xingshijiluyi"));
            anbiaoVehicle.setZongduanid(rs.getString("zongduanid"));
            anbiaoVehicle.setZongduanxinghao(rs.getString("zongduanxinghao"));
            anbiaoVehicle.setIsDeleted(rs.getInt("is_deleted"));
            anbiaoVehicle.setCreatetime(rs.getDate("createtime"));
            anbiaoVehicle.setCaozuoren(rs.getString("caozuoren"));
            anbiaoVehicle.setCaozuorenid(rs.getLong("caozuorenid"));
            anbiaoVehicle.setCaozuoshijian(rs.getDate("caozuoshijian"));
            anbiaoVehicle.setCheliangzhaopian(rs.getString("cheliangzhaopian"));
            anbiaoVehicle.setYunshujiezhi(rs.getString("yunshujiezhi"));
            anbiaoVehicle.setCheliangyunyingleixing(rs.getString("cheliangyunyingleixing"));
            anbiaoVehicle.setZhongduanchangshang(rs.getString("zhongduanchangshang"));
            anbiaoVehicle.setSimnum(rs.getString("simnum"));
            anbiaoVehicle.setTongbushijian(rs.getDate("tongbushijian"));
            anbiaoVehicle.setYunyingshangmingcheng(rs.getString("yunyingshangmingcheng"));
            anbiaoVehicle.setJiashiyuanxingming(rs.getString("jiashiyuanxingming"));
            anbiaoVehicle.setJiashiyuandianhua(rs.getString("jiashiyuandianhua"));
            anbiaoVehicle.setYayunyuanxingming(rs.getString("yayunyuanxingming"));
            anbiaoVehicle.setYayunyuandianhua(rs.getString("yayunyuandianhua"));
            anbiaoVehicle.setChezhu(rs.getString("chezhu"));
            anbiaoVehicle.setChezhudianhua(rs.getString("chezhudianhua"));
            anbiaoVehicle.setZhongduanleixing(rs.getInt("zhongduanleixing"));
            anbiaoVehicle.setYayunyuanid(rs.getString("yayunyuanid"));
            anbiaoVehicle.setTerminalprotocoltype(rs.getString("terminalprotocoltype"));
            anbiaoVehicle.setVideochannelnum(rs.getString("videochannelnum"));
            anbiaoVehicle.setPlatformconnectiontype(rs.getString("platformconnectiontype"));
            anbiaoVehicle.setTeamno(rs.getString("teamno"));
            anbiaoVehicle.setOwenno(rs.getString("owenno"));
            anbiaoVehicle.setDaoluyunshuzheng(rs.getString("daoluyunshuzheng"));
            anbiaoVehicle.setDaoluyunshuzhengchulingriqi(rs.getString("daoluyunshuzhengchulingriqi"));
            anbiaoVehicle.setDaoluyunshuzhengyouxiaoqi(rs.getString("daoluyunshuzhengyouxiaoqi"));
            anbiaoVehicle.setBencinianshenriqi(rs.getString("bencinianshenriqi"));
            anbiaoVehicle.setXiacinianshenriqi(rs.getString("xiacinianshenriqi"));
            anbiaoVehicle.setBencinianjianriqi(rs.getString("bencinianjianriqi"));
            anbiaoVehicle.setXiacinianjianriqi(rs.getString("xiacinianjianriqi"));
            anbiaoVehicle.setBencijipingriqi(rs.getString("bencijipingriqi"));
            anbiaoVehicle.setXiacijipingriqi(rs.getString("xiacijipingriqi"));
            anbiaoVehicle.setBaofeiriqi(rs.getString("baofeiriqi"));
            anbiaoVehicle.setCheliangjishudengji(rs.getString("cheliangjishudengji"));
            anbiaoVehicle.setSuoshuyunguan(rs.getString("suoshuyunguan"));
            anbiaoVehicle.setArea(rs.getString("area"));
            anbiaoVehicle.setDaoluyunzhengfujian(rs.getString("daoluyunzhengfujian"));
            anbiaoVehicle.setDabiaojianchafujian(rs.getString("dabiaojianchafujian"));
            anbiaoVehicle.setNianshenfujian(rs.getString("nianshenfujian"));
            anbiaoVehicle.setXingshizhengzhengyefujian(rs.getString("xingshizhengzhengyefujian"));
            anbiaoVehicle.setXingshizhengfuyefujian(rs.getString("xingshizhengfuyefujian"));
            anbiaoVehicle.setCheliangdengjizhengshuzhengyefujian(rs.getString("cheliangdengjizhengshuzhengyefujian"));
            anbiaoVehicle.setCheliangdengjizhengshufuyefujian(rs.getString("cheliangdengjizhengshufuyefujian"));
            anbiaoVehicle.setBaoxiandaoqishijian(rs.getString("baoxiandaoqishijian"));
            anbiaoVehicle.setXingshizhengdaoqishijian(rs.getString("xingshizhengdaoqishijian"));
            anbiaoVehicle.setCarowneraddress(rs.getString("carowneraddress"));
            anbiaoVehicle.setZhunqianyinzongzhiliang(rs.getString("zhunqianyinzongzhiliang"));
            anbiaoVehicle.setHedingzaizhiliang(rs.getString("hedingzaizhiliang"));
            anbiaoVehicle.setHuoxiangneibuchicun(rs.getString("huoxiangneibuchicun"));
            anbiaoVehicle.setYunyingshangjieruma(rs.getString("yunyingshangjieruma"));


            return anbiaoVehicle;

        }

        return null;

    }
}
